package com.atuguigu.flink.Day07;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class Example1 {
    //使用布隆过滤器计算独立访客数量
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .readTextFile("D:\\ideaproject\\maven\\FlinkBigData1021\\src\\main\\resources\\data\\UserBehavior.csv")
                .map(
                        new MapFunction<String, UserBehavior>() {
                            @Override
                            public UserBehavior map(String value) throws Exception {
                                String arr[]=value.split(",");
                                return new UserBehavior(
                                        arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L
                                );
                            }
                        }
                )
                .filter(r->r.behaviorType.equals("pv"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                            @Override
                            public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                )
                .map(
                        new MapFunction<UserBehavior, Tuple2<String,Long>>() {
                            @Override
                            public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                                return Tuple2.of("key",Long.parseLong(value.userId));
                            }
                        }
                )
                .keyBy(r->r.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .aggregate(new CountAgg(),new WindowResult())
                .print();


        env.execute();

    }

    public static class WindowResult extends ProcessWindowFunction<Long,String,String,TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
            out.collect("窗口 " + new Timestamp(context.window().getStart()) + " => " + new Timestamp(context.window().getEnd()) + " 的UV数据是：" + elements.iterator().next());
        }
    }




    public static class  CountAgg implements AggregateFunction<Tuple2<String,Long>,Tuple2<Long,BloomFilter<Long>>,Long>{


        @Override
        public Tuple2<Long, BloomFilter<Long>> createAccumulator() {
            return Tuple2.of(0L,BloomFilter.create(Funnels.longFunnel(),10000,0.01));
        }

        @Override
        public Tuple2<Long, BloomFilter<Long>> add(Tuple2<String, Long> value, Tuple2<Long, BloomFilter<Long>> accumulator) {
            if(!accumulator.f1.mightContain(value.f1)){
                accumulator.f0 += 1L;
                accumulator.f1.put(value.f1);
            }
            return accumulator;
        }

        @Override
        public Long getResult(Tuple2<Long, BloomFilter<Long>> accumulator) {
            return accumulator.f0;
        }

        @Override
        public Tuple2<Long, BloomFilter<Long>> merge(Tuple2<Long, BloomFilter<Long>> a, Tuple2<Long, BloomFilter<Long>> b) {
            return null;
        }
    }

    public static  class UserBehavior{

        public String userId;
        public String itemId;
        public String categoryId;
        public String behaviorType;
        public Long timestamp;

        public UserBehavior() {
        }

        public UserBehavior(String userId, String itemId, String categoryId, String behaviorType, Long timestamp) {
            this.userId = userId;
            this.itemId = itemId;
            this.categoryId = categoryId;
            this.behaviorType = behaviorType;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId='" + userId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", categoryId='" + categoryId + '\'' +
                    ", behaviorType='" + behaviorType + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
